use halo_model::{ExtensionGVK, ExtensionOperator, GroupVersionKind, GVK};
use once_cell::sync::Lazy;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::fmt::Display;
use std::slice::Iter;
use std::sync::{Arc, RwLock};
use tokio::sync::Mutex;

use crate::config::app_error::AppError::SchemeNotFoundErr;
use crate::config::app_error::{AppError, AppResult};
use crate::extension::index::index::index_spec_registry::IndexSpecRegistry;
use crate::extension::index::index::index_specs::{DefaultIndexSpecs, IndexSpecs};
use crate::extension::index::scheme_watcher_manager::{
    DefaultSchemeWatcherManager, SchemeRegistered, SchemeWatcher, SchemeWatcherManager,
};
use crate::{get_database_service, get_scheme_manager};

pub static SCHEMES: Lazy<Mutex<Vec<Scheme>>> = Lazy::new(|| Mutex::new(Vec::new()));

#[derive(Debug, Serialize, Deserialize, Default, Clone, PartialEq, Eq, Hash)]
pub struct Scheme {
    pub group_version_kind: GroupVersionKind,
    pub plural: String,
    pub singular: String,
    pub open_api_schema: Value,
}

impl Scheme {
    pub async fn new<T: GVK>() -> Scheme {
        Scheme {
            group_version_kind: GroupVersionKind {
                group: T::group().to_string(),
                version: T::version().to_string(),
                kind: T::kind().to_string(),
            },
            plural: T::plural().to_string(),
            singular: T::singular().to_string(),
            open_api_schema: Default::default(),
        }
    }
}

pub struct SchemeManager {
    // 使用 RwLock 提供内部可变性，支持多线程安全的读写操作
    schemes: RwLock<Vec<Scheme>>,
    // pub watcher_manager: DefaultSchemeWatcherManager,
}

impl SchemeManager {
    pub fn new() -> SchemeManager {
        SchemeManager {
            schemes: RwLock::new(Vec::new()),
            // watcher_manager: DefaultSchemeWatcherManager::new(),
        }
    }
    fn register_scheme<T: ExtensionGVK>(&self, scheme: Scheme) -> AppResult<()> {
        let schemes = self.schemes.read()
            .map_err(|_| AppError::InternalServerError("Failed to acquire read lock on schemes".to_string()))?;

        if !schemes.contains(&scheme) {
            // 释放读锁，获取写锁
            drop(schemes);

            let mut schemes = self.schemes.write()
                .map_err(|_| AppError::InternalServerError("Failed to acquire write lock on schemes".to_string()))?;

            // 双重检查，防止在锁切换期间其他线程已经添加了相同的scheme
            if !schemes.contains(&scheme) {
                let mut index_spec_registry= IndexSpecRegistry::new();
                index_spec_registry.index_for::<T>(&scheme)?;
                schemes.push(scheme.clone());
                // self.get_watchers()
                //     .for_each(|t| t.on_change(Box::new(SchemeRegistered::new(scheme.clone()))));
            }
        }
        Ok(())
    }

    pub async fn register<T: ExtensionGVK>(&self) -> AppResult<()> {
        let scheme = Self::convert_scheme::<T>();

        self.register_scheme::<T>(scheme)
    }

    pub async fn register_fun<F, T>(&self, f: F) -> AppResult<()>
    where
        F: Fn(&mut DefaultIndexSpecs) -> AppResult<()>,
        T: ExtensionGVK,
    {
        let scheme = Self::convert_scheme::<T>();

        self.register_fn::<F, T>(scheme, f)
    }

    fn convert_scheme<T: ExtensionGVK>() -> Scheme {
        let scheme = Scheme {
            group_version_kind: GroupVersionKind {
                group: T::group().to_string(),
                version: T::version().to_string(),
                kind: T::kind().to_string(),
            },
            plural: T::plural().to_string(),
            singular: T::singular().to_string(),
            open_api_schema: Value::Null,
        };
        scheme
    }

    fn register_fn<F, T>(&self, scheme: Scheme, f: F) -> AppResult<()>
    where
        F: Fn(&mut DefaultIndexSpecs) -> AppResult<()>,
        T: ExtensionGVK,
    {
        let schemes = self.schemes.read()
            .map_err(|_| AppError::InternalServerError("Failed to acquire read lock on schemes".to_string()))?;

        if schemes.contains(&scheme) {
            return Ok(());
        }

        // 释放读锁，获取写锁
        drop(schemes);

        let mut schemes = self.schemes.write()
            .map_err(|_| AppError::InternalServerError("Failed to acquire write lock on schemes".to_string()))?;

        // 双重检查
        if schemes.contains(&scheme) {
            return Ok(());
        }

        let mut index_spec_registry  = IndexSpecRegistry::new();
        index_spec_registry.index_for::<T>(&scheme)?;
        index_spec_registry.consumer::<F>(&scheme, f)?;
        schemes.push(scheme.clone());
        // self.get_watchers()
        //     .for_each(|t| t.on_change(Box::new(SchemeRegistered::new(scheme.clone()))));
        Ok(())
    }

    pub fn unregister<T: ExtensionOperator>(&self, scheme: &Scheme) -> AppResult<()> {
        let schemes = self.schemes.read()
            .map_err(|_| AppError::InternalServerError("Failed to acquire read lock on schemes".to_string()))?;

        if schemes.contains(scheme) {
            // 释放读锁，获取写锁
            drop(schemes);

            let mut schemes = self.schemes.write()
                .map_err(|_| AppError::InternalServerError("Failed to acquire write lock on schemes".to_string()))?;

            // self.index_spec_registry.remove_index_specs(scheme);
            schemes.retain(|s| s != scheme);
            // self.get_watchers()
            //     .for_each(|t| t.on_change(Box::new(SchemeUnregistered::new(scheme.clone()))))
        }
        Ok(())
    }

    pub fn schemes(&self) -> AppResult<Vec<Scheme>> {
        let schemes = self.schemes.read()
            .map_err(|_| AppError::InternalServerError("Failed to acquire read lock on schemes".to_string()))?;
        Ok(schemes.clone())
    }

    // fn get_watchers(&self) -> Iter<Box<dyn SchemeWatcher>> {
    //     self.watcher_manager.watchers()
    // }

    pub fn size(&self) -> AppResult<usize> {
        let schemes = self.schemes.read()
            .map_err(|_| AppError::InternalServerError("Failed to acquire read lock on schemes".to_string()))?;
        Ok(schemes.len())
    }

    pub fn fetch(&self, gvk: &GroupVersionKind) -> AppResult<Option<Scheme>> {
        let schemes = self.schemes.read()
            .map_err(|_| AppError::InternalServerError("Failed to acquire read lock on schemes".to_string()))?;
        Ok(schemes.iter().find(|t| t.group_version_kind == *gvk).cloned())
    }

    pub fn get(&self, gvk: &GroupVersionKind) -> AppResult<Scheme> {
        self.fetch(gvk)?
            .ok_or(AppError::SchemeNotFoundErr(gvk.clone()))
    }

    pub async fn get_by_type<T: GVK>(&self) -> AppResult<Scheme> {
        let gvk = GroupVersionKind::new(
            T::group().to_string(),
            T::version().to_string(),
            T::kind().to_string(),
        );
        self.get(&gvk)
    }
}


pub async fn build_store_name_str<T: GVK>(name: &String) -> AppResult<String> {
    let scheme_manager = get_scheme_manager!()?;

    let scheme = scheme_manager.get_by_type::<T>().await?;

    let store_name = build_store_name(&scheme, name);

    Ok(store_name)
}

pub async fn get_scheme<T: GVK>() -> AppResult<(String, Scheme)> {
    let scheme_manager = get_scheme_manager!()?;
    let scheme = scheme_manager.get_by_type::<T>().await?;

    let store_name = build_store_name_prefix(&scheme);

    Ok((store_name, scheme))
}


pub fn build_store_name_prefix(scheme: &Scheme) -> String {
    // rule of key: /registry/[group]/plural-name/extension_index-name
    if scheme.group_version_kind.group.is_empty() {
        return format!("/registry/{}", scheme.plural,);
    }
    let key = format!(
        "/registry/{}/{}",
        scheme.group_version_kind.group, scheme.plural,
    );
    key
}
pub fn build_store_name(scheme: &Scheme, name: &String) -> String {
    // rule of key: /registry/[group]/plural-name/extension_index-name
    if scheme.group_version_kind.group.is_empty() {
        return format!("/registry/{}/{}", scheme.plural, name);
    }
    let key = format!(
        "/registry/{}/{}/{}",
        scheme.group_version_kind.group, scheme.plural, name
    );
    return key;
}

